package com.zwcl.common.mq;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.zwcl.common.core.config.RedisConfig;
import com.zwcl.common.core.exception.BaseException;
import com.zwcl.common.core.redis.RedisLockLua;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

import java.lang.reflect.ParameterizedType;
import java.nio.charset.Charset;

@Slf4j
public abstract class RocketMQListenerAwareNew<E> implements RocketMQListener<MessageExt>, InitializingBean {

    private Class messageType;
    private final String charset = "UTF-8";

    @Autowired
    private MqKeyGenerator keyGenerator;
    @Autowired
    private RedisLockLua redisLock;

//    @Override
//    public void onMessage(MessageExt message) {
//        log.info("进入到了消费消息");
//        Object resultObj = JSON.parseObject(new String(message.getBody(), Charset.forName(charset)), messageType);
//
//        //多实例时实现消息幂等
//        String bizId = ((BasePayload) resultObj).getBizId();
//        log.info("消息id:{}",bizId);
//        String lockKey = keyGenerator.getConsumerLockKey(bizId);
//        String value = IdWorker.getIdStr();
//
//        boolean lock = redisLock.lock(lockKey, value, RedisConfig.DEFAULT_LOCK_EXPIRED_TIME, 0L);
//        if(!lock){
//            throw new BaseException("Duplicate consumption: "+bizId);
//        }
//        try{
//            consumerMessage((E) ((BasePayload) resultObj).getMessage());
//        }catch (Exception e){
//            e.printStackTrace();
//        }finally {
//            redisLock.unlock(lockKey, value);
//        }
//    }

    @Override
    public void onMessage(MessageExt message) {
        log.info("进入到了消费消息");
        BasePayload resultObj = JSON.parseObject(new String(message.getBody(), Charset.forName(charset)), BasePayload.class);

        //多实例时实现消息幂等
        String bizId = resultObj.getBizId();
        log.info("消息id:{}",bizId);
        String lockKey = keyGenerator.getConsumerLockKey(bizId);
        String value = IdWorker.getIdStr();

        boolean lock = redisLock.lock(lockKey, value, RedisConfig.DEFAULT_LOCK_EXPIRED_TIME, 0L);
        if(!lock){
            throw new BaseException("Duplicate consumption: "+bizId);
        }
        try{
            consumerMessage((E) resultObj.getMessage());
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            redisLock.unlock(lockKey, value);
        }
    }


    /**
     * 消费消息
     * 注意: 一定要自己实现消息的幂等,rocketMQ只保证消息一定会被投递，不能保证消息只被投递一次
     * 1、建议通过message keys实现 2、数据库对业务key设置唯一索引
     * @param message
     */
    public abstract Boolean consumerMessage(E message);



    @Override
    public void afterPropertiesSet() {
        //TODO:先注释看看
        //this.messageType = getMessageType();
        //log.debug("RocketMQ messageType: {}", messageType.getName());
    }

    private Class getMessageType() {
        return (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }
}